#前言
我有一个需求需要使用 NodeJS 生态的包,但作为 TypeScript 的忠实用户又不想使用 NodeJS,因为开发还需要配置构建工具,尽管这些构建工具非常成熟,于是我将目光投向了 Deno。
Deno 前几年就了解过,但依稀记得当时的体验很差,没想到过几年 2.0 出来后开发体验感觉好很多,还可以直接引入 npm 的包。
决定使用 Deno 开发后,一切感觉都很新颖~~(或许不用 NodeJS 的原因就是想体验新鲜感)~~
说了一些废话后就该思考 Rust 父进程如何和 Deno 子进程进行通信,通信的方式据我个人的了解应该有如下几种
- 标准输入输出(
stdin
/stdout
/pipe
) - Unix domain socket
- 共享内存/共享文件
- 消息队列
- TCP/UDP
考虑应用场景还是使用标准输入输出来进行数据通信,感觉 Unix domain socket 更好,但是 Windows 下 Rust 的并不支持,故还是选择 stdin/stdout
#数据包设计
选择 stdin/stdout
方案有个问题就是普通的日志输出和数据输出会冲突,
一般的解决方案有:
- 针对数据输出做标记
- 劫持
console
,让日志输出也按数据输出 - 劫持
console
,让日志输出走stderr
通道
这里我选择的是第一种方案,因此需要设计一个标记用以区分日志输出和数据输出,日志输出一般是 UTF-8
编码,因此单个字节的标记容易误识别,所以选择了三个字节,数据包的设计如下:
[ 识别码: 3u8 ][ 事件码: 1u8 ][ 数据长度: 4u8 ][ 校验和: 4u8 ][ 数据: [u8] ]
头部共 12 个字节,其中识别码采用 [0x10, 0xAA, 0xFF]
这三个值并无含义,可以随机指定。
#Rust 侧实现
由于异步传染,这里采用 tokio
包
首先定义一个枚举用于表示数据的类型,这里称为事件码
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum EventCode{
Data = 0x01,
Exit = 0xF1,
Flush = 0xF2
}
这里总共定义了三个,其含义分别是:
-
Data
就不说了 -
Exit
是 Rust 通知 Deno 退出 -
Flush
我是用来处理积压的普通日志,由于定义了识别码,对于stdout
输出的数据肯定要等到完整的识别码出现进行处理,否则找不到合适的分割点,如果一直没有数据输出则会导致普通日志出现累积的情况。应该可以按识别码和换行符作为分隔符进行分割,我偷懒直接让 Deno 定时轮询发送 Flush 事件清空挤压普通日志
数据包构建
async fn send_binary_packet<W>(
writer: &mut W,
event_code: EventCode,
bytes: &[u8]
) -> anyhow::Result<()>
where
W: AsyncWrite + Unpin
{
let mut head = [0; 12];
head[0..3].copy_from_slice(&[0x10, 0xAA, 0xFF]);
head[3] = event_code as u8;
head[4..8].copy_from_slice(&(bytes.len() as u32).to_by_bytes());
head[8..12].copy_from_slice(&crc32(bytes).to_be_bytes());
let mut bufs: &mut [_] = &mut [IoSlice::new(&head), IoSlice::new(bytes)];
// writer.write_all_vectored(&mut bufs);
IoSlice::advance_slices(&mut bufs, 0);
while !bufs.is_empty() {
match writer.write_vectored(bufs).await {
Ok(0) => anyhow::bail!("..."),
Ok(n) => IoSlice::advance_slices(&mut bufs, n),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
Err(e) => return Err(e.into()),
}
}
}
启动 Deno 子进程并获取 stdin
和 stdout
管道
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
let mut deno_process = Command::new("deno")
.arg("run")
.arg("--allow-all") // deno 需要明确指定需要哪些权限
.arg("main.ts")
.stdin(Stdio::piped()) // 创建输入管道
.stdout(Stdio::stdout()) // 创建输出管道
.stderr(Stdio::inherit()) // 不处理 stderr, 让其输出到控制台
.spawn()?;
let deno_stdin = deno_process
.stdin
.take()
.ok_or(anyhow::format_err!(""))?;
let deno_stdout = deno_process
.stdout.take()
.ok_or(anyhow::format_err!(""))?;
读取 stdout
管道,由于需要使用切片进行分割,而 tokio
库中并read_until
只能使用单个字节作为分隔符而不能使用切片作为分隔符因此需要我们自行实现一个函数,可以参考 read_until
的源码实现一个 read_until_slice
let mut reader = BufReader::new(deno_stdout);
let mut buffer = Vec::new();
loop {
let bytes_read = reader.read_unitl_slice(&[0x10, 0xAA, 0xFF], &mut buffer).await?;
if bytes_read == 0 { break; } // EOF
// 读取剩下的 9 bytes 数据
{
let mut tmp = [0; 9];
reader.read_exact(&mut tmp).await?;
buffer.extend_from_slice(&tmp)
}
let pos = find_subarray(&buffer, &[0x10, 0xAA, 0xFF]);
// 处理识别码前的数据(即 console 输出)
if pos > 0 {
let console_output = &buffer[..pos];
io::stdout().write_all(console_output).await?;
buffer.drain(..pos);
}
// 处理二进制包(剩余部分)
let head = &buffer[..12];
let event_code = head[3];
let data_len = u32::from_be_bytes((&head[4..8]).try_into()?);
let checksum = u32::from_be_bytes((&head[8..12]).try_into()?);
buffer.drain(..12);
// 忽略该事件
if event_code == EventCode::Flush as u8 {
continue;
}
// 剩下来就从 reader 读取 data_len 字节数的数据了和校验数据完整性了
}
#Deno 侧实现
从 stdin
中读取数据
EventStream
: event-stream.ts
const events = Deno.stdin.readable.pipeThrough(new EventStream());
for await (const [evetCode, data] of events){
...
}
发送数据到 stdout
export const sendBinaryPacket = async (
eventCode: EventCode,
data: Uint8Array,
) => {
const head = new Uint8Array(12);
head.set([0x10, 0xAA, 0xFF], 0);
const view = new DataView(head.buffer);
view.setUint8(3, eventCode);
view.setUint32(4, data.length);
view.setUint32(8, crc32(data));
const packet = new Uint8Array(head.length + data.length);
packet.set(head, 0);
packet.set(data, head.length);
try {
await Deno.stdout.write(packet);
} catch (e) {
if (e instanceof Deno.errors.BrokenPipe) {
logger.error('Stdout pipe unexpectedly closed');
Deno.exit();
}
throw e;
}
};